package com.dec.kks.etl;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

public class TCPSocketMain {

    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("tcp with socket source");
        System.setProperty("hadoop.home.dir","/home/hdfs/bigdata/hadoop-2.7.4");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        JavaReceiverInputDStream<String> lines = jssc.socketStream("localhost", 9999,
                new Function<InputStream, Iterable<String>>() {
            @Override
            public Iterable<String> call(InputStream v1) throws Exception {

                byte[] bytes = new byte[1024];
                int b = v1.read(bytes);
                for (int i = 0; i < 10; i++) {
                    System.out.println(bytes[i]);
                }
                System.out.println("xxxxxxxxxxxxxxxx");
                List<String> input = new ArrayList<String>();

                return input;
            }
        }, StorageLevel.MEMORY_AND_DISK_2());

        lines.print();

        jssc.start();
        jssc.awaitTermination();
    }
}
